package gecko;

import gecko.lang.JSONObject;
import gecko.lang.Reactor;
import gecko.lang.TypedMap;
import gecko.x.ClassX;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Collectors;

import static gecko.x.ClassX.nameOf;

/**
 * @author 陈永佳 (yoojiachen@gmail.com)
 * @version 0.0.1
 */
public class GeckoEngine extends GenericEngine {

    private final static Logger LOGGER = LoggerFactory.getLogger(GeckoEngine.class);

    /**
     * 封装一些全局方法和参数；
     * GeckoScoped的意义在于为组件和应用程序提供一个全局的参数访问能力、工具方法等；
     */
    private final GeckoScoped mScoped = new GeckoScoped();

    /**
     * 提供一个转换接口，使得Driver可以查找对应的Pipeline
     */
    private final PipelineSelector mPipelineSelector = mPipelines::get;

    /**
     * Reactor 分组状态事件队列；
     */
    private final Reactor<GeckoContext> mReactor = new Reactor<>(
            Integer.parseInt(System.getProperty("gecko.dispatcher.capacity", "32")));

    /**
     * 分布式ID生成
     */
    private SnowflakeId mSnowflakeId;

    /**
     * 执行Trigger事件调度。
     * 输入事件将通过内部 Reactor 调度器，在不同事件状态下被调度；
     * 处理完成后，返回输出事件。
     */
    private final VirtualTrigger.ContextInvoker mInvoker = (event, callback) -> {
        final GeckoContext context = new GeckoContext(event.topic, new Inbound(event.data)) {
            @Override
            void onCompleted(Outbound outbound) {
                callback.onComplete(event.topic, outbound.data());
            }
        };
        context.setContextId(mSnowflakeId.nextId());
        try {
            mReactor.emmitStart(context);
        } catch (InterruptedException ex) {
            LOGGER.error("Trigger.Invoker派发事件出错", ex);
        }
    };

    public GeckoEngine() {
        // 拦截器调度
        mReactor.addLast((idx, context) -> {
            context.addAttribute("Process.Interceptor.Start", context.escaped() + "ms");
            final String topic = context.topic();
            mScoped.DebugIfV(LOGGER, "拦截器调度处理, Topic: {}", topic);

            final Collection<Interceptor> accepted = mInterceptors.stream()
                    .filter(it -> isTopicMatches(it, topic))
                    .sorted(Comparator.comparingInt(Interceptor::getPriority))
                    .collect(Collectors.toList());

            for (Interceptor interceptor : accepted) {
                final String name = nameOf(interceptor);
                mScoped.DebugIfV(LOGGER, "拦截器处理中，Interceptor: {}", name);
                try {
                    interceptor.handle(context, mScoped);
                } catch (DropException de) {
                    // Drop，中断
                    mScoped.DebugIfV(LOGGER, "拦截器抛弃事件，Interceptor: {}", name);
                    context.outbound().addDataFields(JSONObject.error("INTERCEPTOR_DROPPED"));
                    mReactor.emmitEnd(context);
                    return;
                } catch (Exception ex) {
                    LOGGER.error("拦截器抛出无法处理错误", ex);
                }
            }
            context.addAttribute("Process.Interceptor.End", context.escaped() + "ms");
            mReactor.emmitNext(idx, context);
        });

        // 用户驱动处理
        mReactor.addLast((idx, context) -> {
            context.addAttribute("Process.Driver.Start", context.escaped() + "ms");
            final String topic = context.topic();
            mScoped.DebugIfV(LOGGER, "驱动调度处理, Topic: {}", topic);

            final Collection<Driver> drivers = mDrivers.stream()
                    .filter(it -> isTopicMatches(it, topic))
                    .collect(Collectors.toList());

            for (Driver driver : drivers) {
                mScoped.DebugIfV(LOGGER, "用户驱动处理中，Driver: {}", nameOf(driver));
                try {
                    driver.handle(context, mPipelineSelector, mScoped);
                } catch (DropException de) {
                    LOGGER.error("用户驱动不支持Drop操作", de);
                } catch (Exception ex) {
                    LOGGER.error("用户驱动抛出无法处理错误", ex);
                }
            }
            context.addAttribute("Process.Driver.End", context.escaped() + "ms");
            mReactor.emmitNext(idx, context);
        });

        // 输出处理
        mReactor.addLast((index, context) -> {
            context.attributes().forEach((k, v) -> {
                mScoped.DebugIfV(LOGGER, "{} = {}", k, v);
            });
            context.onCompleted(context.outbound());
        });
    }

    /**
     * 初始化引擎。
     * 从配置信息中加载和初始化各个组件
     *
     * @param appConfig 全局配置信息
     */
    public void init(Map<String, Object> appConfig) {
        LOGGER.info("Engine初始化...");
        mScoped.init(TypedMap.wrap(appConfig));

        mSnowflakeId = new SnowflakeId(mScoped.__workerId(), mScoped.__dataCenterId());

        //// 通过SPI机制加载和初始化组件

        // Plugin
        ServiceLoader.load(VirtualPlugin.class).forEach(this::addPlugin);
        initSPIBundles(mScoped, mPlugins,
                "Plugin.Init", (mod, conf) -> {
                    // no config
                },
                k -> getSPIConf(mScoped.PluginsConfigs, k),
                mPlugins::removeAll);

        // DevicePipeline
        ServiceLoader.load(DevicePipeline.class).forEach(this::addDevicePipeline);
        mPipelines.forEach((protocol, group) -> {
            LOGGER.info("协议：{}, DevicePipeline: {}", protocol, ClassX.classNameOf(group));
            group.onInit(mScoped.RouterConfig.getDictMap(protocol), mScoped);
        });

        // 拦截器
        ServiceLoader.load(Interceptor.class).forEach(this::addInterceptor);
        initSPIBundles(mScoped, mInterceptors,
                "Interceptor.Init", (mod, conf) -> {
                    mod.setPriority(conf.priority);
                    mod.setTopics(conf.topics);
                },
                k -> getSPIConf(mScoped.InterceptorConfigs, k),
                mInterceptors::removeAll);

        // 用户驱动
        ServiceLoader.load(Driver.class).forEach(this::addDriver);
        initSPIBundles(mScoped, mDrivers,
                "Driver.Init", (mod, conf) -> {
                    mod.setTopics(conf.topics);
                },
                k -> getSPIConf(mScoped.DriversConfigs, k),
                mDrivers::removeAll);

        // 虚拟设备
        final List<VirtualDevice> spiDevices = new ArrayList<>();
        ServiceLoader.load(VirtualDevice.class).forEach(spiDevices::add);
        initSPIBundles(mScoped, spiDevices,
                "DevicePipeline.Init",
                (mod, conf) -> {
                    checkAndSetHardware(mod, conf);
                    addVirtualDevice(mod);
                },
                k -> getSPIConf(mScoped.DevicesConfigs, k),
                items -> items.forEach(dev -> {
                    final DevicePipeline pipeline = mPipelines.get(dev.getProtocolName());
                    if (pipeline != null) {
                        pipeline.removeVirtualDevice(dev);
                    }
                }));

        // 触发器
        ServiceLoader.load(VirtualTrigger.class).forEach(this::addTrigger);
        initSPIBundles(mScoped, mTriggers,
                "Trigger.Init", (mod, conf) -> {
                    // no config
                },
                k -> getSPIConf(mScoped.TriggersConfigs, k),
                mTriggers::removeAll);

        //// 动态加载，是指由配置文件指定类路径，由ClassLoader动态的组件。

        // 虚拟设备由于具有同一个类功能，绑定不同地址的特性。所以允许通过ClassLoader动态重复加载
        initDynamicBundles(mScoped.DevicesConfigs).forEach(c -> {
            final VirtualDevice it = ClassX.newObject(c.className);
            checkAndSetHardware(it, c);
            it.onInit(c.initArgs, mScoped);
            addVirtualDevice(it);
        });

        // 触发器设备，可以重用
        initDynamicBundles(mScoped.TriggersConfigs).forEach(c -> {
            final VirtualTrigger vt = ClassX.newObject(c.className);
            vt.onInit(c.initArgs, mScoped);
            addTrigger(vt);
        });

        //// 输出初始化结果统计

        final List<VirtualDevice> devices = mPipelines.values().stream()
                .flatMap(h -> h.getDevices().stream())
                .collect(Collectors.toList());

        LOGGER.info("已加载 Interceptors: {}", this.mInterceptors.size());
        logEachMods(mScoped, this.mInterceptors, "Interceptor");

        LOGGER.info("已加载 Devices: {}", devices.size());
        logEachMods(mScoped, devices, "Devices");

        LOGGER.info("已加载 Drivers: {}", this.mDrivers.size());
        logEachMods(mScoped, this.mDrivers, "Driver");

        LOGGER.info("已加载 Triggers: {}", this.mTriggers.size());
        logEachMods(mScoped, this.mTriggers, "Trigger");

        LOGGER.info("已加载 Plugins: {}", this.mPlugins.size());
        logEachMods(mScoped, this.mPlugins, "Plugin");

        LOGGER.info("Engine初始化...OK");

    }

    /**
     * 启动Engine
     */
    public void start() {
        LOGGER.info("Engine启动...");

        mReactor.start();
        mPlugins.forEach(mScoped::startTakes);
        mPipelines.values().forEach(mScoped::startTakes);
        mDrivers.forEach(mScoped::startTakes);
        mTriggers.forEach(vt -> mScoped.takes(nameOf(vt) + ".Start",
                () -> vt.onStart(mScoped, mInvoker)));

        LOGGER.info("Engine启动...OK");
    }

    /**
     * 停止Engine
     */
    public void shutdown() {
        LOGGER.info("Engine停止...");

        mTriggers.forEach(vt -> mScoped.takes(nameOf(vt) + ".Stop",
                () -> vt.onStop(mScoped, mInvoker)));
        mPipelines.values().forEach(mScoped::stopTakes);
        mDrivers.forEach(mScoped::stopTakes);
        mPlugins.forEach(mScoped::stopTakes);
        mScoped.release();
        mReactor.shutdown();

        LOGGER.info("Engine停止...OK");
    }

    /**
     * @return Version
     */
    public String version() {
        return mScoped.version();
    }

    ////

    private static boolean isTopicMatches(TopicFilter tf, String topic) {
        return tf.getTopics().stream().anyMatch(t -> t.matches(topic));
    }

}
